package com.edu.hbade.test.device.log.query;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.poi.xssf.usermodel.XSSFCell;
import org.apache.poi.xssf.usermodel.XSSFRow;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;





public class HbaseUtil {
	private static Configuration conf = null;
	private static HTablePool tp = null;
	
	private static String hbaseTableName = "ota_pre_record";
	private static String family1 = "info";
	private static String qualifier1="check_time";
	private static String qualifier2="down_time";
	private static String qualifier3="up_time";
	private static String qualifier4="down_fail_time";
	private static String qualifier5="down_fail_status";
	private static String qualifier6="up_fail_time";
	private static String qualifier7="up_fail_status";
	private static String qualifier8="status";

	static {
		// 加载集群配置
		conf = HBaseConfiguration.create();
		conf.set("hbase.rootdir","hdfs://192.168.1.20:9000/hbase");
		conf.set("hbase.zookeeper.quorum", "192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181");
		conf.set("hbase.master","192.168.1.20:60000");
		// 创建表池(可伟略提高查询性能，具体说明请百度或官方API)
		tp = new HTablePool(conf, 10);
	}
	
	public static void main(String[] args) throws IOException {
  
		int currentPage = 1;
		int pageSize = 20;
		//String rowKeyPrefix = "22222+88888+123";
		String rowKey = "22222\\+88888\\+.*";
		String checkTimeStart = "2017-12-01 10:00:00";
		String checkTimeEnd = "2017-12-01 11:00:00";
		String status = "";//0:所有 1:检测成功 2:下载失败 3:下载成功 4:升级失败 5:升级成功
		// 执行hbase分页查询
		//getPageData(hbaseTableName, rowKey, status, checkTimeStart, checkTimeEnd, currentPage, pageSize);
		
		//执行hbase分批导出
		batchExport(hbaseTableName, rowKey, status, checkTimeStart, checkTimeEnd);
		
	}
	
	
	public static void batchExport(String tableName,  String rowKey,String status,String checkTimeStart,String checkTimeEnd)
			throws IOException {
		
		ResultScanner scanner = null;
		FileOutputStream fos = null;
		try{

			// 从表池中取出HBASE表对象
			HTableInterface table = getTable(tableName);
			// 获取筛选对象
			Scan scan = new Scan();
			//给筛选对象放入过滤器(true标识分页)
			FilterList fl = packageFilters(true,rowKey,status,checkTimeStart,checkTimeEnd);
			scan.setFilter(fl);
			// 缓存1000条数据
			scan.setCaching(1000);
			scan.setCacheBlocks(false);
			scanner = table.getScanner(scan);
			
			List<byte[]> rowList = new LinkedList<byte[]>();//获得所有的rowKey的二进制码
			// 遍历扫描器对象， 并将需要查询出来的数据row key取出
			for (Result result : scanner) {//result中只有rowKey和check_time,没有其他的列,所以不能在这里封装所有的结果。只有知道具体的rowkey，才能从result中获取值
				String row = toStr(result.getRow());
				rowList.add(getBytes(row));
			}
			int totalRowNum = rowList.size();
			
			//要写入的文件
			File file = new File("D:\\hbaseExport.xlsx");
	        fos = new FileOutputStream(file,true);	
			//建立excel
			String[] columns = {"mid", qualifier1, qualifier2, qualifier3,qualifier4,qualifier5,qualifier6,qualifier7,qualifier8}; 
			XSSFWorkbook workbook=new XSSFWorkbook();
	        XSSFSheet sheet=workbook.createSheet("result");//set sheetname
	        XSSFRow row = null;
	        XSSFCell cell=null; 
	        //表头
	        row = sheet.createRow(0);
	        for(int i = 1 ; i<= columns.length ; i++){  	
	        	cell=row.createCell(i-1);
	            cell.setCellValue(columns[i-1]);
	        }         
			
			// 获取取出的row key的GET对象
			List<Get> getList = getList(rowList);
			Result[] results = table.get(getList);
			// 遍历结果
			List<HbaseOtaPreRecordEntity> beanList  = new LinkedList<HbaseOtaPreRecordEntity>();
			for (Result result : results) {
				String tmpRowKey = toStr(result.getRow());
				Map<byte[], byte[]> fmap = packFamilyMap(result);
				HbaseOtaPreRecordEntity bean = packRowMap(fmap,tmpRowKey);
				beanList.add(bean);
			}

		    //插入数据到Excel
	        for(int rowNum = 1; rowNum <= beanList.size() ; rowNum++){
	        	row=sheet.createRow(rowNum);
	        	HbaseOtaPreRecordEntity bean = beanList.get(rowNum-1);
	        	
	            row.createCell(0).setCellValue(bean.getRowKey());            
	            row.createCell(1).setCellValue(bean.getCheckTime() == null? "" : bean.getCheckTime());            
	            row.createCell(2).setCellValue(bean.getDownTime() == null? "" : bean.getDownTime());            
	            row.createCell(3).setCellValue(bean.getUpTime() == null? "" : bean.getUpTime());            
	            row.createCell(4).setCellValue(bean.getDownFailTime() == null? "" : bean.getDownFailTime());            
	            row.createCell(5).setCellValue(bean.getDownFailStatus() == null? "" : bean.getDownFailStatus());            
	            row.createCell(6).setCellValue(bean.getUpFailTime() == null? "" : bean.getUpFailTime());            
	            row.createCell(7).setCellValue(bean.getUpFailStatus() == null? "" : bean.getUpFailStatus());            
	            row.createCell(8).setCellValue(bean.getStatus() == null? "" : bean.getStatus());            
	         }
			//写到磁盘
	        
	        workbook.write(fos);
			
	        
//	        response.setContentType("application/vnd.ms-excel");
//			response.setHeader("Content-Disposition", "attachment; filename="+URLEncoder.encode("Report-"+ sdf2.format(entity.getCheckTime()), "UTF-8")+".xlsx");
//			workbook.write(response.getOutputStream());	
			
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			closeScanner(scanner);
			if(fos != null){
				fos.close();
			}
		}


	}
	
	
	
	public static TBData getPageData(String tableName,  String rowKey,String status,String checkTimeStart,String checkTimeEnd, Integer currentPage, Integer pageSize)
			throws IOException {
		//List<Map<String, String>> mapList  = new LinkedList<Map<String, String>>();
		List<HbaseOtaPreRecordEntity> beanList  = new LinkedList<HbaseOtaPreRecordEntity>();
		
		ResultScanner scanner = null;
		// 为分页创建的封装类对象，下面有给出具体属性
		TBData tbData = null;
		try {
			// 默认显示第一页，每页20条
			if (pageSize == null || pageSize == 0L){
				pageSize = 20; 
			}
			if (currentPage == null || currentPage == 0){
				currentPage = 1;
			}

			// 计算起始页和结束页
			Integer firstPage = (currentPage - 1) * pageSize;
			Integer endPage = firstPage + pageSize;

			// 从表池中取出HBASE表对象
			HTableInterface table = getTable(tableName);
			// 获取筛选对象
			Scan scan = new Scan();
			//给筛选对象放入过滤器(true标识分页)
			FilterList fl = packageFilters(true,rowKey,status,checkTimeStart,checkTimeEnd);
			scan.setFilter(fl);
			// 缓存1000条数据
			scan.setCaching(1000);
			scan.setCacheBlocks(false);
			scanner = table.getScanner(scan);
			int i = 0;
			List<byte[]> rowList = new LinkedList<byte[]>();//获得所有的rowKey的二进制码
			// 遍历扫描器对象， 并将需要查询出来的数据row key取出
			for (Result result : scanner) {//result中只有rowKey和check_time,没有其他的列,所以不能在这里封装所有的结果
				String row = toStr(result.getRow());
				if (i >= firstPage && i < endPage) {
					rowList.add(getBytes(row));
				}
				i++;
			}
			// 获取取出的row key的GET对象
			List<Get> getList = getList(rowList);
			Result[] results = table.get(getList);
			// 遍历结果
			for (Result result : results) {
				String tmpRowKey = toStr(result.getRow());
				Map<byte[], byte[]> fmap = packFamilyMap(result);
//				Map<String, String> rmap = packRowMap(fmap);
//				mapList.add(rmap);
				HbaseOtaPreRecordEntity bean = packRowMap(fmap,tmpRowKey);
				beanList.add(bean);
			}

			// 封装分页对象
			tbData = new TBData();
			tbData.setCurrentPage(currentPage);
			tbData.setPageSize(pageSize);
			tbData.setTotalCount(i);
			tbData.setTotalPage(getTotalPage(pageSize, i));
			//tbData.setResultList(mapList);
			tbData.setResultList(beanList);
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			closeScanner(scanner);
		}
		return tbData;
	}
	

	/*
	 * 获取hbase的表
	 */
	public static HTableInterface getTable(String tableName) {
		if (StringUtils.isEmpty(tableName)){
			return null;
		}else{	
			return tp.getTable(getBytes(tableName));
		}
	}

	/* 转换byte数组 */
	public static byte[] getBytes(String str) {
		if (str == null){
			str = "";
		}	
		return Bytes.toBytes(str);
	}
	/*获得总页数*/
	private static int getTotalPage(int pageSize, int totalCount) {
		int n = totalCount / pageSize;
		if (totalCount % pageSize == 0) {
			return n;
		} else {
			return ((int) n) + 1;
		}
	}
	/*封装过滤条件*/
	private static FilterList packageFilters(boolean isPage,String rowKey,String status, String checkTimeStart,String checkTimeEnd ) {
		FilterList filterList = null;
		// MUST_PASS_ALL(条件 AND) MUST_PASS_ONE（条件OR）
		filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
		//1.要求rowKey必须以什么开头(productId-deltaId-mid 开头的模糊匹配)
		//Filter filter1 = new RowFilter(CompareFilter.CompareOp.EQUAL,new BinaryPrefixComparator(rowKeyPrefix.getBytes()));
		//1.要求rowKey正则匹配
		Filter filter1 = new RowFilter(CompareFilter.CompareOp.EQUAL,new RegexStringComparator(rowKey));
		//2.要求family1-qualifier1这一列的值必须大于等于condition1(checkTime必须大于等于多少)
		SingleColumnValueFilter filter2 = new SingleColumnValueFilter(getBytes(family1), getBytes(qualifier1),CompareOp.GREATER_OR_EQUAL, getBytes(checkTimeStart));
		filter2.setFilterIfMissing(true); //如果不设置为 true，则那些不包含指定 column 的行也会返回
		//3.要求family1-qualifier1这一列的值必须小于等于condition2(checkTIme必须小于等于多少)
		SingleColumnValueFilter filter3 = new SingleColumnValueFilter(getBytes(family1), getBytes(qualifier1),CompareOp.LESS_OR_EQUAL, getBytes(checkTimeEnd));	
		filter3.setFilterIfMissing(true); //如果不设置为 true，则那些不包含指定 column 的行也会返回
		
		filterList.addFilter(filter1);
		filterList.addFilter(filter2);
		filterList.addFilter(filter3);
		
		if(status != null && ! "".equals(status)){
			//4.要求family1-qualifier8这一列的值必须等于status(按照状态过滤)
			SingleColumnValueFilter filter4 = new SingleColumnValueFilter(getBytes(family1), getBytes(qualifier8),CompareOp.EQUAL, getBytes(status));
			filter4.setFilterIfMissing(true); //如果不设置为 true，则那些不包含指定 column 的行也会返回
			filterList.addFilter(filter4);
		}


		
//		if (isPage) {//分页
//			//返回的结果集中只包含第一列的数据，那么这个过滤器能够满足你的要求。它在找到每行的第一列之后会停止扫描，从而使扫描的性能也得到了一定的提升,但是要过滤多列，就不能加这个filter
//			filterList.addFilter(new FirstKeyOnlyFilter());
//		}
		return filterList;
	}
	
	private static void closeScanner(ResultScanner scanner) {
		if (scanner != null){
			scanner.close();
		}	
	}
	
	/**
	 * 封装每行数据
	 */
	//private static Map<String, String> packRowMap(Map<byte[], byte[]> dataMap) {
	private static HbaseOtaPreRecordEntity packRowMap(Map<byte[], byte[]> dataMap,String tmpRowKey) {
		Map<String, String> map = new LinkedHashMap<String, String>();
		HbaseOtaPreRecordEntity bean = new HbaseOtaPreRecordEntity();
		for (byte[] key : dataMap.keySet()) {
			byte[] value = dataMap.get(key);
			map.put(toStr(key), toStr(value));
		}	
		if(tmpRowKey != null){bean.setRowKey(tmpRowKey);}
		if(map.get(qualifier1) != null){bean.setCheckTime(map.get(qualifier1));}
		if(map.get(qualifier2) != null){bean.setDownTime(map.get(qualifier2));}
		if(map.get(qualifier3) != null){bean.setUpTime(map.get(qualifier3));}
		if(map.get(qualifier4) != null){bean.setDownFailTime(map.get(qualifier4));}
		if(map.get(qualifier5) != null){bean.setDownFailStatus(map.get(qualifier5));}
		if(map.get(qualifier6) != null){bean.setUpFailTime(map.get(qualifier6));}
		if(map.get(qualifier7) != null){bean.setUpFailStatus(map.get(qualifier7));}
		if(map.get(qualifier8) != null){bean.setStatus(map.get(qualifier8));}
			
		//return map;
		return bean;
	}
	
	private static String toStr(byte[] bt) {
		return Bytes.toString(bt);
	}
	
	/* 根据ROW KEY集合获取GET对象集合 */
	private static List<Get> getList(List<byte[]> rowList) {
		List<Get> list = new LinkedList<Get>();
		for (byte[] row : rowList) {
			Get get = new Get(row);
			get.addColumn(getBytes(family1), getBytes(qualifier1));
			get.addColumn(getBytes(family1), getBytes(qualifier2));
			get.addColumn(getBytes(family1), getBytes(qualifier3));
			get.addColumn(getBytes(family1), getBytes(qualifier4));
			get.addColumn(getBytes(family1), getBytes(qualifier5));
			get.addColumn(getBytes(family1), getBytes(qualifier6));
			get.addColumn(getBytes(family1), getBytes(qualifier7));
			get.addColumn(getBytes(family1), getBytes(qualifier8));
			list.add(get);
		}
		return list;
	}

	/**
	 * 封装配置的所有字段列族
	 */
	private static Map<byte[], byte[]> packFamilyMap(Result result) {
		Map<byte[], byte[]> dataMap = new LinkedHashMap<byte[], byte[]>();
		dataMap.putAll(result.getFamilyMap(getBytes(family1)));
		return dataMap;
	}
}
